 1.大数据高速计算引擎Spark（上）之Spark Core中RDD编程高阶下的广播变量
   
   有时候需要在多个任务之间共享变量，或者在任务（Task）和Driver Program之间
共享变量。为了满足这种需求，Spark提供了两种类型的变量：
      广播变量（broadcast variables）
      累加器（accumulators）
   广播变量、累加器主要作用是为了优化Spark程序。
   广播变量将变量在节点的 Executor 之间进行共享(由Driver广播出去)；
   广播变量用来高效分发较大的对象。向所有工作节点(Executor)发送一个较大的只读
值，以供一个或多个操作使用。
   使用广播变量的过程如下：
       对一个类型 T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T]
对象。 任何可序列化的类型都可以这么实现（在 Driver 端）
       通过 value 属性访问该对象的值（在 Executor 中）
       变量只会被发到各个 Executor 一次，作为只读值处理
   广播变量的相关参数：
       spark.broadcast.blockSize（缺省值：4m）
       spark.broadcast.checksum（缺省值：true）
       spark.broadcast.compress（缺省值：true）
   广播变量的运用（Map Side Join）
package cn.lagou.sparkcore

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object JoinDemo {
  def main(args: Array[String]): Unit = {
    val conf =
      new SparkConf().setAppName(
        this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)
    // 设置本地文件切分大小
    sc.hadoopConfiguration.setLong("fs.local.block.size",
      128*1024*1024)

    // map task：数据准备
    val productRDD: RDD[(String, String)] =
      sc.textFile("file:///D:\\jdbc_work\\Lagou_BigData\\data\\lagou_product_info.txt")
      .map { line =>
        val fields = line.split(";")
        (fields(0), line)
      }
    val orderRDD: RDD[(String, String)] =
      sc.textFile("file:///D:\\jdbc_work\\Lagou_BigData\\data\\orderinfo.txt",8 )
      .map { line =>
        val fields = line.split(";")
        (fields(2), line)
      }
    // join有shuffle操作
    val resultRDD = productRDD.join(orderRDD)

    println(resultRDD.count())
    Thread.sleep(1000000)
    sc.stop()
  }
}

   执行时间46s，shuffle read 450M

package cn.lagou.sparkcore

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object MapSideJoin {
  def main(args: Array[String]): Unit = {
    val conf =
      new SparkConf().setAppName(
        this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)
    // 设置本地文件切分大小
    sc.hadoopConfiguration.setLong("fs.local.block.size",
      128 * 1024 * 1024)

    // map task：数据准备
    val productRDD: RDD[(String, String)] =
      sc.textFile("file:///D:\\jdbc_work\\Lagou_BigData\\data\\lagou_product_info.txt")
        .map { line =>
          val fields = line.split(";")
          (fields(0), line)
        }
    val productBC = sc.broadcast(productRDD.collectAsMap())

    // map task：完成数据的准备
    val orderRDD: RDD[(String, String)] =
      sc.textFile("file:///D:\\jdbc_work\\Lagou_BigData\\data\\orderinfo.txt", 8)
        .map { line =>
          val fields = line.split(";")
          (fields(2), line)
        }
    // join有shuffle操作

    val resultRDD = orderRDD.map { case (pid, orderInfo) =>
      val productInfoMap = productBC.value
      val productInfoString = productInfoMap.getOrElse(pid, null)
      (pid, (productInfoString, orderInfo))
    }

    println(resultRDD.count())
    Thread.sleep(1000000)
    sc.stop()
  }

}
   
 2.累加器
   
   累加器的作用：可以实现一个变量在不同的 Executor 端能保持状态的累加；
   累计器在 Driver 端定义，读取；在 Executor 中完成累加；
   累加器也是 lazy 的，需要 Action 触发；Action触发一次，执行一次，触发多次，执
行多次；
   累加器一个比较经典的应用场景是用来在 Spark Streaming 应用中记录某些事件的数量；

val data = sc.makeRDD(Seq("hadoop map reduce", "spark mllib"))

// 方式1
val count1 = data.flatMap(line => line.split("\\s+")).map(word => 1).reduce(_ + _)
println(count1)

// 方式2。错误的方式
var acc = 0
data.flatMap(line => line.split("\\s+")).foreach(word => acc
+= 1)
println(acc)
// 在Driver中定义变量，每个运行的Task会得到这些变量的一份新的副本，但在
// Task中更新这些副本的值不会影响Driver中对应变量的值
    
	Spark内置了三种类型的累加器，分别是
        LongAccumulator 用来累加整数型
        DoubleAccumulator 用来累加浮点型
        CollectionAccumulator 用来累加集合元素

val data = 
sc.makeRDD("hadoop spark hive hbase java scala hello world spark scala java hive"
.split("\\s+"))
val acc1 = sc.longAccumulator("totalNum1")
val acc2 = sc.doubleAccumulator("totalNum2")
val acc3 = sc.collectionAccumulator[String]("allWords")
val rdd = data.map { word =>
acc1.add(word.length)
acc2.add(word.length)
acc3.add(word)
word
}

rdd.count
rdd.collect

println(acc1.value)
println(acc2.value)
println(acc3.value)